package com.example.yilanchatserver.business.freechat;

import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.Disposable;
import reactor.core.publisher.FluxSink;


@Slf4j
public class FuJiaSubscriber implements Subscriber<String>, Disposable {
    private final FluxSink<String> emitter;
    private Subscription subscription;




    public FuJiaSubscriber(FluxSink<String> emitter) {
        this.emitter = emitter;
    }

    @Override
    public void onSubscribe(Subscription subscription) {
        this.subscription = subscription;
        subscription.request(1);
    }

    @Override
    public void onNext(String data) {
        log.info("FuJia返回数据：{}", data);
        try{
            JSONObject dataJson = JSONObject.parseObject(data);
            String content = dataJson.getString("data");
            String type = dataJson.getString("type");

            if("aidhSpeak".equals(type)){
                emitter.next("[Done]");
                emitter.complete();
            } else {
                emitter.next(content);
                subscription.request(1);
            }

        }catch (Exception e){
            log.error(e.toString());
            emitter.next("[Error]");
            emitter.complete();
        }


    }

    @Override
    public void onError(Throwable t) {
        log.error("FuJia返回数据异常：{}", t.getMessage());
        emitter.error(t);
    }

    @Override
    public void onComplete() {
        log.info("FuJia返回数据完成");
        emitter.next("[Done]");
        emitter.complete();
    }

    @Override
    public void dispose() {
        log.warn("FuJia返回数据关闭");
        emitter.complete();
    }

}
